delayed_job

delayed_job用来处理rails中的异步延时任务,支持发送大量实时通讯、改变图片尺寸、http下载等等。

例如, 我要在controller中,发送10万封邮件。 这个操作要耗时3个小时。我们不能这样写:

def send_emails
  (1..1000000).each do |n|
    ## 开始处理发送邮件。
  end
  redirect_to ...
end

这样写的话,是不行的。 每次这个action都会timeout。

对于这个情况,我们就要让程序在后台运行。

安装

在Active Record中使用,直接在Gemfile中加入

gem 'delayed_job_active_record'

运行bundle install安装后端和delayed_job gem。 Active Record后端需要一个任务表,创建命令:

$ rails generate delayed_job:active_record
$ rake db:migrate

就会生成一个migration:


class CreateDelayedJobs < ActiveRecord::Migration
  def self.up
    create_table :delayed_jobs, force: true do |table|
      table.integer :priority, default: 0, null: false # Allows some jobs to jump to the front of the queue
      table.integer :attempts, default: 0, null: false # Provides for retries, but still fail eventually.
      table.text :handler,                 null: false # YAML-encoded string of the object that will do work
      table.text :last_error                           # reason for last failure (See Note below)
      table.datetime :run_at                           # When to run. Could be Time.zone.now for immediately, or sometime in the future.
      table.datetime :locked_at                        # Set when a client is working on this object
      table.datetime :failed_at                        # Set when all retries have failed (actually, by default, the record is deleted instead)
      table.string :locked_by                          # Who is working on this object (if locked)
      table.string :queue                              # The name of the queue this job is in
      table.timestamps null: true
    end

    add_index :delayed_jobs, [:priority, :run_at], name: "delayed_jobs_priority"
  end

  def self.down
    drop_table :delayed_jobs
  end
end

如果是rails 4.2,则需要在config/application.rb中配置queue_adapter

# config/application.rb
module YourApp
  class Application < Rails::Application
    # Be sure to have the adapter's gem in your Gemfile and follow
    # the adapter's specific installation and deployment instructions.
    config.active_job.queue_adapter = :delayed_job
  end
end

如果在Rails 4.x中使用了protected_attributes gem,必须将gem protected_attributes放在delayed_job前面,不然会报以下错误:

ActiveRecord::StatementInvalid: PG::NotNullViolation: ERROR: null value in column "handler" violates not-null constraint

开发模式(development)

在development环境下,如果rails的版本大于3.1,每当执行完100个任务或者任务完成时,程序代码会自动加载,所以在development环境更改代码后不用重启任务。

任务队列

在任何对象中调用.delay.method(params),它将在后台进行处理

#不使用延时任务
@user.activate!(@device)

#使用延时任务
@user.delay.activate!(@device)

使用了 delay之后, 所有的任务,都要:

  1. 写入到特定的“任务表”中。
  2. 我们要运行一些“工人(worker)“, 来执行这些任务。
class User < ActiveRecord::Base
  def eat
    Rails.logger.info "== begin eating ..."
    sleep 2
    Rails.logger.info "== finished ..."
  end
end
rails console:
> User.first.delay.eat
$ bundle exec script/delayed_job -n 2 start

表示,同时有两个worker 在干活儿。这两个worker, 会紧盯着“任务表”, 一有任务,马上开始执行。

kaikai:graduate$ bundle exec ruby bin/delayed_job -n 2 start
delayed_job.0: process with pid 31047 started.
delayed_job.1: process with pid 31053 started.
  1. 看日志。就会发现, 这些worker 在不间断的工作。
# 表示 worker 开始认领了这个任务,开始执行了。
# 第一步。 认领该任务。并且,把该任务的状态,变成: locked.
17:44:52 DEBUG:   Delayed::Backend::ActiveRecord::Job Load (0.5ms)  SELECT  `delayed_jobs`.* FROM `delayed_jobs` WHERE `delayed_jobs`.`locked_by` = 'delayed_job.1 host:kaikai pid:31053' AND `delayed_jobs`.`locked_at` = '2016-11-03 09:44:52' AND `delayed_jobs`.`failed_at` IS NULL  ORDER BY `delayed_jobs`.`id` ASC LIMIT 1
# 第二步。 开始执行了。
17:44:52 INFO: == begin eating ...
17:44:52 DEBUG:   User Load (0.3ms)  SELECT  `users`.* FROM `users` WHERE `users`.`id` = 1 LIMIT 1
# 这个是delayed job的日志
17:44:52 INFO: 2016-11-03T17:44:52+0800: [Worker(delayed_job.1 host:kaikai pid:31053)] Job User#eat (id=2) RUNNING
17:44:54 INFO: == finished ...
# 第三步。 执行完毕,从delay_jobs (任务表)中,删掉干才干完的任务。
17:44:54 INFO: 2016-11-03T17:44:54+0800: [Worker(delayed_job.0 host:kaikai pid:31047)] Job User#eat (id=1) COMPLETED after 2.2580
# 因为它干完一个任务就删掉一个, 所以,当delay_jobs 表中,没有任何记录的时候,就是活儿干完
的时候。

如果一个方法需要一直在后台运行,可以在方法声明之后,调用handle_asynchronously

class Device
  def deliver
    #长时间运行方法
  end
  handle_asynchronously :deliver
end

device = Device.new
device.deliver

参数

handle_asynchronouslydelay有下列参数:

  • :priority (number):数值越小越先执行,默认为0,但可以重新配置
  • :run_at (time):在这个时间后执行任务
  • :queue(string):任务队列名,是priority的另一种选择
class LongTasks
  def send_mailer
    # Some other code
  end
  handle_asynchronously :send_mailer, :priority => 20

  def in_the_future
    # Some other code
  end
  #五分钟后将执行(5.minutes.from_now)
  handle_asynchronously :in_the_future, :run_at => Proc.new { 5.minutes.from_now }

  def self.when_to_run
    2.hours.from_now
  end

  class << self
    def call_a_class_method
      # Some other code
    end
    handle_asynchronously :call_a_class_method, :run_at => Proc.new { when_to_run }
  end

  attr_reader :how_important

  def call_an_instance_method
    # Some other code
  end
  handle_asynchronously :call_an_instance_method, :priority => Proc.new {|i| i.how_important }
end

如果在调试的时候要调用一个handle_asynchronously的方法,但是不想延迟执行,这时候可以在方法名后面加上_without_delay,例如原方法名叫做foo,立即执行为foo_without_delay

运行任务

script/delayed_job能够用来管理后来进程,开始任务。 添加gem "daemons"到Gemfile,并且执行rails generate delayed_job`

RAILS_ENV=production script/delayed_job start
RAILS_ENV=production script/delayed_job stop

# 在不同的进程上执行两个worker
RAILS_ENV=production script/delayed_job -n 2 start
RAILS_ENV=production script/delayed_job stop

# 使用--queue或者--queues选项来指定不同的队列
RAILS_ENV=production script/delayed_job --queue=tracking start
RAILS_ENV=production script/delayed_job --queues=mailers,tasks start

#使用--pool选项来指定worker pool,可以多次使用这个选项来给不同队列启动不同数量的worker

#下面启动一个worker执行`tracking`队列,两个worker执行`mailers`和`task`队列,另外两个worker给其他的任务
RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start

# 执行所有任务,然后退出
RAILS_ENV=production script/delayed_job start --exit-on-complete
#或者直接在前端执行
RAILS_ENV=production script/delayed_job run --exit-on-complete

Rials 4 需要替换script/delayed_jobbin/delayed_job worker可以运行在任何电脑上,只要它们能够访问数据库并且时钟保持同步。每个worker至少每5秒钟检查一次数据库。

可以使用rake jobs:work来开启任务,使用CTRL-C终止rake任务。 如果需要执行所有任务并退出,可以使用rake jobs:workoff 处理队列通过设置环境变量QUEUE或者QUEUES

$ QUEUE=tracking rake jobs:work
$ QUEUES=mailers,tasks rake jobs:work

重启delayed_job

重启delayed_job:

$ RAILS_ENV=production script/delayed_job restart

重启多个delayed_job workers:

$ RAILS_ENV=production script/delayed_job -n2 restart

Rials 4 需要替换script/delayed_jobbin/delayed_job

清除任务

使用rake jobs:clear删除队列中的所有任务

results matching ""

    No results matching ""